Dataframe Repartition

Repartition is the process of movement of data on the basis of some column or expression or random into required number of partitions. This depends on the kind of value/s you are passing which determines how many partitions will be created. You may want to do Repartition when you have understanding of your data and you know how you can improve the performance of dataframe operations by repartitioning it on the basis of some key columns. Also understand that repartition is a costly operation because it requires shuffling of all the data across nodes. You can increase or decrease the number of partitions using “Repartition”.

Let’s see this with an example:

Create DataFrame
val empDF = spark.createDataFrame(Seq(
      (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
    )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
empDF.show

Find DataFrame Partition Size
 

Reparation DataFrame
val empDF_part = empDF.repartition(5)
empDF_part.rdd.partitions.size

So we have repartitioned existing dataframe from 8 partition to 5. The data was “randomly” shuffled to number of partitions required.This can be confirmed from explain plan.

empDF_part.explain()

Similarly, you can also specify the column on the basis of which repartition is required. The data is repartitioned using “HASH” and number of partition will be determined by value set for numpartitions. i.e.spark.sql.shuffle.partitions. Change this value if want different number of partitions.

val empDF_partCol = empDF.repartition($"empno")
empDF_partCol.explain()
spark.sql("set spark.sql.shuffle.partitions").show(false)



No comments:

Post a Comment